package com.atguigu.flink.state;

import com.atguigu.flink.function.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by Smexy on 2023/11/15
 
 把收到的单词进行拼接之后再输出。
 */
public class Demo1_RawState
{
    public static void main(String[] args) {
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);
                env
                   .socketTextStream("hadoop102", 8888)
                   .map(new MyMapFunction())
                    .addSink(new SinkFunction<String>()
                    {
                        @Override
                        public void invoke(String value, Context context) throws Exception {
                            if (value.contains("x")){
                                //手动模拟故障
                                throw new RuntimeException("出故障了...");
                            }
                            System.out.println(value);
                        }
                    });

        
                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
        
    }

    // 把收到的单词进行累积拼接之后再输出。
    private static class MyMapFunction implements MapFunction<String,String>{

        //RawState 状态。 用的状态的类型不是Flink提供的
        private List<String> result = new ArrayList<>();
        @Override
        public String map(String value) throws Exception {
            //写状态
            result.add(value);
            //读状态
            return result.toString();
        }
    }
}
